#include <algorithm>
#include <chrono>
#include <cmath>
#include <cstddef>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <queue>
#include <unordered_set>
#include <mutex>
#include <time.h>

#include <grpc/grpc.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>

#include "hello.pb.h"
#include "hello.grpc.pb.h"

using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::ServerReader;
using grpc::ServerReaderWriter;
using grpc::ServerAsyncReaderWriter;
using grpc::ServerWriter;
using grpc::Status;

using hello::HelloService;
using hello::HelloMsg;
using TagType = std::function<void(bool)>;//本人使用的tag是函数指针,函数指针使用的是绑定了类的类对象函数.

using namespace std;

class GrpcStreamServerInstance;
const static std::string server_address("0.0.0.0:8860");
static unordered_set<GrpcStreamServerInstance *> GrpcStreamServerInstanceSet;//存放着所有的已经连接上的stream对应的GrpcStreamServerInstance
class GrpcStreamServerInterface {
public:
    virtual void connected(bool ok) = 0; //新连接接入服务器
    virtual void readDone(bool ok) = 0;  //读到一帧新消息
    virtual void writeDone(bool ok) = 0; //写入完成一帧消息到客户端
    virtual void disconnect(bool ok) = 0; //服务器被动断开,不管谁发送的断开指令
};

typedef hello::HelloService::AsyncService ServeiceType;

class GrpcStreamServerInstance : public GrpcStreamServerInterface{
private:
   
public:
    GrpcStreamServerInstance(ServeiceType* service,grpc::ServerCompletionQueue* inputCq);
    virtual ~GrpcStreamServerInstance(){};
    void connected(bool ok) override;
    void readDone(bool ok) override;
    void writeDone(bool ok) override;
    void disconnect(bool ok) override;
    bool asycSendMsg(HelloMsg& msg);
private:
    ServeiceType* service;
    grpc::ServerCompletionQueue* cq;//Completion Queue
    ServerContext serverContext;//每一个stream都有自己的serverContext
    ServerAsyncReaderWriter<HelloMsg, HelloMsg> stream;
    //函数指针
    TagType connectedFunc;//新链接接入时触发
    TagType readDoneFunc;//读到新消息时触发
    TagType writeDoneFunc;//发送一帧消息成功后触发
    TagType disconnectFunc;//stream断开时触发
    //inputMsg用来接收消息，用stream.Read()中绑定
    HelloMsg inputMsg;    
    //onWrite用来区分stream有没有在发送消息，如果stream在发送，则只需要将消息写入writeBuffer，
    //否则要使用stream.Write()触发stream的发送;
    bool onWrite;
    //写缓存
    queue<HelloMsg> writeBuffer;
};

//GrpcStreamServerInstance不需要其它线程交互，故不需要互斥锁
GrpcStreamServerInstance::GrpcStreamServerInstance(ServeiceType* inputService,grpc::ServerCompletionQueue* inputCq):\
service(inputService),cq(inputCq),stream(&serverContext)
{
    //使用std::bind绑定对象和类对象函数得到一个函数指针
    connectedFunc = std::bind(&GrpcStreamServerInstance::connected, this, std::placeholders::_1);
    readDoneFunc = std::bind(&GrpcStreamServerInstance::readDone, this, std::placeholders::_1);
    writeDoneFunc = std::bind(&GrpcStreamServerInstance::writeDone, this, std::placeholders::_1);
    disconnectFunc = std::bind(&GrpcStreamServerInstance::disconnect, this, std::placeholders::_1);
    
    //设置serverContext，stream断开时，CompletionQueue会返回一个tag，这个tag就是输入的disconnectFunc这个函数指针
    serverContext.AsyncNotifyWhenDone(&disconnectFunc);
    //设置当新新链接connect的时候，cq返回connectedFunc作为tag
    service->Requesthello(&serverContext,&stream, cq,cq,&connectedFunc);

    onWrite = false;
}
void GrpcStreamServerInstance::connected(bool ok){
    //新建一个GrpcStreamServerInstance，一个client的grpc链接就对应一个GrpcStreamServerInstance实例
    stream.Read(&inputMsg,&readDoneFunc);
    //新的GrpcStreamServerInstance，会在构造函数中调用service->Requesthello()来绑定新链接
    new GrpcStreamServerInstance(service,cq);
    //新加入的stream对应的GrpcStreamServerInstance会被加入到GrpcStreamServerInstanceSet中，用于发送消息或者统计stream链接。
    GrpcStreamServerInstanceSet.insert(this);
    cout << "当前链接数量为"<< GrpcStreamServerInstanceSet.size()<< endl;  
}   

void GrpcStreamServerInstance::readDone(bool ok){
    try{
        if(!ok){
            //当ok == false，说明stream已经断开
            return;
        }
        cout << "收到消息,id为"<< inputMsg.id() <<",msg为" << inputMsg.msg() << endl;  
        stream.Read(&inputMsg,&readDoneFunc);
    }catch(const std::exception& e){     
        cout << e.what() << endl;   
    }
}
void GrpcStreamServerInstance::writeDone(bool ok){
    if(!ok){
        //当ok == false，说明stream已经断开
        return;
    }
    onWrite = false;
    if(writeBuffer.empty())return;
    //当grpc写完时，会触发writeDone，我们只需要从自定义的writeBuffer中取一帧继续写即可
    stream.Write(std::move(writeBuffer.front()),&writeDoneFunc);
    writeBuffer.pop();
    onWrite = true;
}

void GrpcStreamServerInstance::disconnect(bool ok){ 
    GrpcStreamServerInstanceSet.erase(this);
    cout << "链接断开,当前链接数量为"<< GrpcStreamServerInstanceSet.size()<< endl; 
    delete this;
}
bool GrpcStreamServerInstance::asycSendMsg(HelloMsg& msg){
    writeBuffer.push(msg);
    if(!onWrite){
        //没有任何写操作在执行
        onWrite = true;
        stream.Write(std::move(writeBuffer.front()),&writeDoneFunc);
        writeBuffer.pop();
    }
    return true;
}


class GrpcStreamServerThread {
public:
    GrpcStreamServerThread(): msgNum(0){};
    ~GrpcStreamServerThread();
    void run();
    bool sendMsg(const HelloMsg& msg);
    bool isTimeElapsed(struct timeval now,struct timeval last,int64_t ms);
private:
    queue<HelloMsg> msgQueue;
    std::atomic_int msgNum;
    timed_mutex writeLock;
};

bool GrpcStreamServerThread::isTimeElapsed(struct timeval now,struct timeval last,int64_t ms){
    int64_t sub = (now.tv_sec - last.tv_sec)*1000;
    sub  = sub + (now.tv_usec - last.tv_usec)/1000;
    return sub > ms ? true :false;
}

void GrpcStreamServerThread::run(){
    try{
        std::unique_ptr<grpc::ServerCompletionQueue> cq;
        ServeiceType service;
        ServerBuilder builder;

        // builder.AddChannelArgument(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 10000);
        builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
        builder.RegisterService(&service);
        cq = builder.AddCompletionQueue();
        std::unique_ptr<Server> server_= builder.BuildAndStart();
        new GrpcStreamServerInstance(&service,cq.get());

        struct timeval lastTime = {0,0};//用来记录时间，保证下面定时消息的发送
        while (true) {
            void * tag;
            bool ok;
            //阻塞100毫秒，gpr_time_from_millis()函数的单位是毫秒,输入的是tag和ok的地址，cq->AsyncNext()会把结果写到地址对应的内存上
            grpc::ServerCompletionQueue::NextStatus status = cq->AsyncNext(&tag, &ok,\
            gpr_time_from_millis(100,GPR_TIMESPAN));

            if(status ==  grpc::ServerCompletionQueue::NextStatus::GOT_EVENT){
                //grpc服务器有新的事件，强制转换tag从void * 到 std::function<void(bool)> *，即void *(bool) 函数指针
                TagType* functionPointer = reinterpret_cast<TagType*>(tag);
                //通过函数指针functionPointer调用函数GrpcStreamServerInstance::xxx
                (*functionPointer)(ok);
            }
            //从msgQueue中取出新的消息，发送各个client
            if( msgNum != 0){
                std::unique_lock<timed_mutex>lock(writeLock,std::defer_lock);
                chrono::milliseconds  tryTime(500);
                if(lock.try_lock_for(tryTime)){
                    //获取到了锁
                    while(!msgQueue.empty()){
                        HelloMsg msg = msgQueue.front();
                        for(auto temp : GrpcStreamServerInstanceSet){
                            temp->asycSendMsg(msg);
                        }
                        msgQueue.pop();
                    }
                }else{
                    cout << "500ms内没抢到锁" << endl;  
                    continue;
                }
            }
            //定时发送消息
            struct timeval now;
            gettimeofday(&now, NULL);
            if(isTimeElapsed(now,lastTime,10*1000)){//判断是否已经过了10秒
                lastTime = now;
                HelloMsg msg;
                msg.set_id(2);
                msg.set_msg("hello world");
                for(auto temp : GrpcStreamServerInstanceSet){
                    temp->asycSendMsg(msg);
                }
            }
            //此处可以加入一些自定义的处理函数，比如记录时间等等，但是不应该阻塞太久。
        }
    }catch(const std::exception& e){        
        cout << e.what() << endl;   
    }
}
GrpcStreamServerThread::~GrpcStreamServerThread(){
    
}


bool GrpcStreamServerThread::sendMsg(const  HelloMsg& msg){
    {
        std::unique_lock<timed_mutex>lock(writeLock,std::defer_lock);
        chrono::milliseconds  tryTime(500);
        //main函数与GrpcStreamServerThread::run()处于不同的线程，需要加锁保证线程安全
        if(lock.try_lock_for(tryTime)){
            //获取到了锁
            msgQueue.push(msg);
            msgNum++;
        }else{
            cout << "500ms内没抢到锁" << endl;  
            return false;
        }
    }
    return true; 
}


int main(){    
    GrpcStreamServerThread* grpcStreamServerThread = new GrpcStreamServerThread();
    thread myThread(std::bind(&GrpcStreamServerThread::run, grpcStreamServerThread));
    HelloMsg msg;
    msg.set_id(1);

    while(1){
        std::string input;
        cin >> input;
        //输入exit，跳出循环结束程序。
        if(input == "exit")break;
        msg.set_msg(input);
        grpcStreamServerThread->sendMsg(msg);
    }
    
    return 0;
}
